/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.rpc.remote.http.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.iec.edp.caf.commons.core.SerializerFactory;
import io.iec.edp.caf.commons.core.enums.SerializeType;
import io.iec.edp.caf.commons.exception.CAFRuntimeException;
import io.iec.edp.caf.commons.exception.entity.DefaultExceptionProperties;
import io.iec.edp.caf.commons.exception.entity.ExceptionErrorCode;
import io.iec.edp.caf.commons.exception.ExceptionLevel;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.commons.utils.StringUtils;
import io.iec.edp.caf.rpc.api.common.GspSerializeType;
import io.iec.edp.caf.rpc.api.entity.RpcContext;
import io.iec.edp.caf.rpc.api.entity.RpcServiceMethodDefinition;
import io.iec.edp.caf.rpc.api.event.RpcServerEventBroker;
import io.iec.edp.caf.rpc.api.serialize.RpcSerializeUtil;
import io.iec.edp.caf.rpc.api.service.InternalServiceManageService;
import io.iec.edp.caf.rpc.api.service.RpcServer;
import io.iec.edp.caf.rpc.api.spi.RpcContextResolver;
import io.iec.edp.caf.rpc.api.spi.RpcContextTransmittee;
import io.iec.edp.caf.rpc.api.support.ConstanceVarible;
import io.iec.edp.caf.rpc.api.utils.Utils;
import io.iec.edp.caf.rpc.remote.http.api.RpcServiceApi;
import io.iec.edp.caf.rpc.remote.http.entity.RpcStreamParam;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * @author Leon Huo
 * @Date: 2021/5/31
 */
@Slf4j
public class RpcServiceApiImpl implements RpcServiceApi {

    private RpcServer rpcServer;

    private RpcServerEventBroker serverEventBroker;

//    private ICAFContextService cafContextService;

    private InternalServiceManageService management=SpringBeanUtils.getBean(InternalServiceManageService.class);

//    private BizContextManager contextManager = SpringBeanUtils.getBean(BizContextManager.class);

    public RpcServiceApiImpl(RpcServer rpcServer, RpcServerEventBroker serverEventBroker) {
        this.rpcServer = rpcServer;
        this.serverEventBroker = serverEventBroker;
    }

//    public RpcServiceApiImpl(RpcServer rpcServer, RpcServerEventBroker serverEventBroker, ICAFContextService cafContextService) {
//        this.rpcServer = rpcServer;
//
//        this.serverEventBroker = serverEventBroker;
//
//        this.cafContextService=cafContextService;
//
//    }

    /**
     * rpc server rest api for normal invoke
     * invoke rpc service
     * @param serviceId service id
     * @param version   service version
     * @param paramDict service parameters
     * @return
     * @throws ClassNotFoundException
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws JsonProcessingException
     * @throws JsonProcessingException
     */
    @Override
    public String invoke(String serviceId, String version, LinkedHashMap<String, String> paramDict) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, JsonProcessingException, JsonProcessingException {
        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = servletRequestAttributes.getRequest();
        HttpServletResponse response = servletRequestAttributes.getResponse();
        //表示响应的内容区数据的媒体类型为json格式，且编码为utf-8(客户端应该以utf-8解码)
        response.setContentType("application/json;charset=utf-8");
        response.addHeader(ConstanceVarible.GSP_RPC_SERVER, "j");


        var contextStr = request.getHeader(ConstanceVarible.GSP_CONTEXT);
        var gspContext = (HashMap<String, String>) SerializerFactory.getDeserializer(SerializeType.Json).deserialize(contextStr, HashMap.class);
        if (!gspContext.containsKey(ConstanceVarible.GSP_RPC))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);
        var value = gspContext.get(ConstanceVarible.GSP_RPC);
        if (value == null || value.length() == 0 || !value.equals("true"))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);

        //var contextId = gspContext.get("ContextId");
        var serviceUnitName = gspContext.get(ConstanceVarible.GSP_MSU);
        String tenantIdString = gspContext.get(ConstanceVarible.GSP_RPC_TENANT);
        Integer tenantId = !StringUtils.isEmpty(tenantIdString) ? Integer.valueOf(tenantIdString) : null;

        replayContext(gspContext,tenantId);

        gspContext.remove(ConstanceVarible.GSP_RPC);
        gspContext.remove(ConstanceVarible.GSP_CONTEXT_ID);
        gspContext.remove(ConstanceVarible.GSP_MSU);

        var eventContext = gspContext.get(ConstanceVarible.GSP_RPC_CLIENT_ENVENT);
        var eventContextDict = SerializerFactory.getDeserializer(SerializeType.Json).deserialize(eventContext, HashMap.class);
        var localDict = new HashMap<String, Object>();
        localDict.put(ConstanceVarible.LOG_MSU, serviceUnitName);
        var methodParams = new HashMap<String, Object>();
        for (Map.Entry<String, String> entry : paramDict.entrySet()) {
            methodParams.put(entry.getKey(), entry.getValue());
        }
        this.serverEventBroker.firePreRpcInvokeEvent(eventContextDict, methodParams, localDict);
        String result;

        try {

            var start = System.currentTimeMillis();
//            var newParams = getResponseStreamParam(null,paramDict);
            LinkedHashMap<String,Object> newParams = new LinkedHashMap<>();
            newParams.putAll(paramDict);
            //反序列化参数
            var intermanager = SpringBeanUtils.getBean(InternalServiceManageService.class);
            var rpcServiceMethodDefinition = intermanager.getRpcMethodDefinition(serviceId);
            Object[] paramArray = RpcSerializeUtil.deSerializeParameter(rpcServiceMethodDefinition, newParams);

            result = (String) rpcServer.invokeService(serviceId, version, paramArray, GspSerializeType.Json);
            var end = System.currentTimeMillis();
            log.info("rpc remote invoke server time：{} ms",end-start);

        } catch (Exception e) {
            serverEventBroker.fireExceptionRpcInvokeEvent(eventContextDict, methodParams, localDict, e);
            throw e;
        } finally {
            restoreContext(null,tenantId);
        }

        var contextDict = new HashMap<String, String>();
        serverEventBroker.firePostRpcInvokeEvent(contextDict, result, localDict);
        var eventContextStr = SerializerFactory.getDeserializer(SerializeType.Json).serializeToString(contextDict);
        response.addHeader(ConstanceVarible.GSP_RPC_SERVER_ENVENT, eventContextStr);

        return result;
    }

    /**
     * rpc server rest api for response file,stream... invoke
     * invoke rpc service
     * @param serviceId service id
     * @param version   service version
     * @param paramDict service parameters
     * @return
     * @throws ClassNotFoundException
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws JsonProcessingException
     * @throws JsonProcessingException
     */
    @SneakyThrows
    @Override
    public Object invokeStream(String serviceId, String version, LinkedHashMap<String, String> paramDict) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, JsonProcessingException {

        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = servletRequestAttributes.getRequest();
        HttpServletResponse response = servletRequestAttributes.getResponse();
        //表示响应的内容区数据的媒体类型为二进制流式格式，且编码为utf-8(客户端应该以utf-8解码)
        response.setContentType("application/octet-stream;charset=utf-8");
        response.addHeader(ConstanceVarible.GSP_RPC_SERVER, "j");

        var contextStr = request.getHeader(ConstanceVarible.GSP_CONTEXT);
        var gspContext = (HashMap<String, String>) SerializerFactory.getDeserializer(SerializeType.Json).deserialize(contextStr, HashMap.class);
        if (!gspContext.containsKey(ConstanceVarible.GSP_RPC))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);
        var value = gspContext.get(ConstanceVarible.GSP_RPC);
        if (value == null || value.length() == 0 || !value.equals("true"))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);

        //var contextId = gspContext.get("ContextId");
        var serviceUnitName = gspContext.get(ConstanceVarible.GSP_MSU);
        String tenantIdString = gspContext.get(ConstanceVarible.GSP_RPC_TENANT);
        Integer tenantId = !StringUtils.isEmpty(tenantIdString) ? Integer.valueOf(tenantIdString) : null;

        replayContext(gspContext,tenantId);

        gspContext.remove(ConstanceVarible.GSP_RPC);
        gspContext.remove(ConstanceVarible.GSP_CONTEXT_ID);
        gspContext.remove(ConstanceVarible.GSP_MSU);

        var eventContext = gspContext.get(ConstanceVarible.GSP_RPC_CLIENT_ENVENT);
        var eventContextDict = SerializerFactory.getDeserializer(SerializeType.Json).deserialize(eventContext, HashMap.class);
        var localDict = new HashMap<String, Object>();
        localDict.put(ConstanceVarible.LOG_MSU, serviceUnitName);

        var methodParams = new HashMap<String, Object>();
        for (Map.Entry<String, String> entry : paramDict.entrySet()) {
            methodParams.put(entry.getKey(), entry.getValue());
        }
        this.serverEventBroker.firePreRpcInvokeEvent(eventContextDict, methodParams, localDict);

        Object result;

        try {

            var start = System.currentTimeMillis();
            //use stream invoke,then set the service target method first parameter to be Response
            //use response the target method can set stream data to response output stream

            var contextDict = new HashMap<String, String>();
            serverEventBroker.firePostRpcInvokeEvent(contextDict, null, localDict);
            var eventContextStr = SerializerFactory.getDeserializer(SerializeType.Json).serializeToString(contextDict);
            response.addHeader(ConstanceVarible.GSP_RPC_SERVER_ENVENT, eventContextStr);

            var newParams = new LinkedHashMap<String,Object>();
            RpcServiceMethodDefinition serviceMethodDefinition = this.management.getRpcMethodDefinition(serviceId);
            if(serviceMethodDefinition.getParameters()!=null
                    &&serviceMethodDefinition.getParameters().size()>0
                    &&serviceMethodDefinition.getParameters().get(0).getType().getClassName()!=null
                    &&serviceMethodDefinition.getParameters().get(0).getType().getClassName().equals(HttpServletResponse.class.getTypeName())) {
                newParams = getResponseStreamParam(response,paramDict);
            }
            else{
                newParams.putAll(paramDict);
            }

            //反序列化参数
            var intermanager = SpringBeanUtils.getBean(InternalServiceManageService.class);
            var rpcServiceMethodDefinition = intermanager.getRpcMethodDefinition(serviceId);
            Object[] paramArray = RpcSerializeUtil.deSerializeParameter(rpcServiceMethodDefinition, newParams);
            if(serviceMethodDefinition.getParameters()!=null&&serviceMethodDefinition.getParameters().size()>0&&serviceMethodDefinition.getParameters().get(0).getType().getClassName()!=null&&serviceMethodDefinition.getParameters().get(0).getType().getClassName().equals(HttpServletResponse.class.getTypeName()))
                result = rpcServer.invokeService(serviceId, version, paramArray,GspSerializeType.Json);
            else
                result = rpcServer.invokeServiceStream(serviceId, version, paramArray);
            var end = System.currentTimeMillis();
            log.info("rpc remote invoke server time：{} ms",end-start);

        } catch (Exception e) {
            serverEventBroker.fireExceptionRpcInvokeEvent(eventContextDict, methodParams, localDict, e);
            throw e;
        } finally {
            restoreContext(null,tenantId);
        }
        return result;
    }

    /**
     * rpc server rest api for request file,stream... invoke
     * invoke rpc service
//     * @param rpcStreamParam {@link RpcStreamParam} RpcStreamParam
     * @return
     * @throws ClassNotFoundException
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws InstantiationException
     * @throws IllegalAccessException
     * @throws JsonProcessingException
     * @throws JsonProcessingException
     */
    @Override
    public Object invokeStream(String serviceId,
                               String version,
                               LinkedHashMap<String, String> paramDict,
                               InputStream inputStream) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, IOException {
//        var serviceId =  (String)rpcStreamParam.getAttachmentObject("serviceid",String.).getObject();//rpcStreamParam.getServiceId();
//        var version =  (String)rpcStreamParam.getAttachment("version").getObject();//rpcStreamParam.getVersion();
//        var paramDict = (LinkedHashMap<String,String>)rpcStreamParam.getAttachment("params").getObject();//new LinkedHashMap<String,String>();//rpcStreamParam.getParamDict();
//        var inputstream = rpcStreamParam.getAttachment("inputstream").getDataHandler().getInputStream();

        ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = servletRequestAttributes.getRequest();
        HttpServletResponse response = servletRequestAttributes.getResponse();
        //表示响应的内容区数据的媒体类型为json格式，且编码为utf-8(客户端应该以utf-8解码)
        response.setContentType("application/json;charset=utf-8");
        response.addHeader(ConstanceVarible.GSP_RPC_SERVER, "j");


        var contextStr = request.getHeader(ConstanceVarible.GSP_CONTEXT);
        var gspContext = (HashMap<String, String>) SerializerFactory.getDeserializer(SerializeType.Json).deserialize(contextStr, HashMap.class);
        if (!gspContext.containsKey(ConstanceVarible.GSP_RPC))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);
        var value = gspContext.get(ConstanceVarible.GSP_RPC);
        if (value == null || value.length() == 0 || !value.equals("true"))
            throw new CAFRuntimeException(DefaultExceptionProperties.SERVICE_UNIT,
                    DefaultExceptionProperties.RESOURCE_FILE,
                    ExceptionErrorCode.illegalRPCCall,
                    new String[]{},
                    null, ExceptionLevel.Error, false);

        //var contextId = gspContext.get("ContextId");
        var serviceUnitName = gspContext.get(ConstanceVarible.GSP_MSU);
        String tenantIdString = gspContext.get(ConstanceVarible.GSP_RPC_TENANT);
        Integer tenantId = !StringUtils.isEmpty(tenantIdString) ? Integer.valueOf(tenantIdString) : null;

        replayContext(gspContext,tenantId);

        gspContext.remove(ConstanceVarible.GSP_RPC);
        gspContext.remove(ConstanceVarible.GSP_CONTEXT_ID);
        gspContext.remove(ConstanceVarible.GSP_MSU);


        var eventContext = gspContext.get(ConstanceVarible.GSP_RPC_CLIENT_ENVENT);
        var eventContextDict = SerializerFactory.getDeserializer(SerializeType.Json).deserialize(eventContext, HashMap.class);
        var localDict = new HashMap<String, Object>();
        localDict.put(ConstanceVarible.LOG_MSU, serviceUnitName);
        var methodParams = new HashMap<String, Object>();
        for (Map.Entry<String, String> entry : paramDict.entrySet()) {
            methodParams.put(entry.getKey(), entry.getValue());
        }
        this.serverEventBroker.firePreRpcInvokeEvent(eventContextDict, methodParams, localDict);
        String result;

        try {

            var start = System.currentTimeMillis();
            var intermanager = SpringBeanUtils.getBean(InternalServiceManageService.class);
            var rpcServiceMethodDefinition = intermanager.getRpcMethodDefinition(serviceId);
//            var newParams = getRequstStreamParam(inputStream,paramDict);
            LinkedHashMap<String,Object> params = new LinkedHashMap<>();
            params.putAll(paramDict);
            var newParams = Utils.handleStreamParam(rpcServiceMethodDefinition,params,inputStream);
            //反序列化参数
            Object[] paramArray = RpcSerializeUtil.deSerializeParameter(rpcServiceMethodDefinition, newParams);

            result = (String) rpcServer.invokeService(serviceId, version, paramArray, GspSerializeType.Json);
            var end = System.currentTimeMillis();
            log.info("rpc remote invoke server time：{} ms",end-start);

        } catch (Exception e) {
            serverEventBroker.fireExceptionRpcInvokeEvent(eventContextDict, methodParams, localDict, e);
            throw e;
        } finally {
            restoreContext(null,tenantId);
        }

        var contextDict = new HashMap<String, String>();
        serverEventBroker.firePostRpcInvokeEvent(contextDict, result, localDict);
        var eventContextStr = SerializerFactory.getDeserializer(SerializeType.Json).serializeToString(contextDict);
        response.addHeader(ConstanceVarible.GSP_RPC_SERVER_ENVENT, eventContextStr);

        return result;

    }

    private LinkedHashMap<String,Object> getResponseStreamParam(HttpServletResponse response, LinkedHashMap<String, String> paramDict){
        var newParams = new LinkedHashMap<String,Object>();

        if(response!=null){
            newParams.put(ConstanceVarible.RPC_HTTP_RESPONSE,response);
        }

        for(var p:paramDict.keySet()){
            newParams.put(p,paramDict.get(p));
        }

        return newParams;
    }


    /**
     * replay context
     */
    private void replayContext(HashMap<String, String> gspContext,Integer tenantId) {
        RpcContext rpcContext = RpcContext.getInstance();
        //todo 这段逻辑不确定是否被使用
//        if(gspContext.get(ConstanceVarible.BIZ_CONTEXT_ID)!=null && gspContext.get(ConstanceVarible.BIZ_CONTEXT_ID).length()>0){
//            rpcContext.putContext("transmitte-bizcontextid",gspContext.get(ConstanceVarible.BIZ_CONTEXT_ID));
//            RpcContextResolver.replay(rpcContext,
//                    RpcContextTransmittee.TransmitteType.BIZCONTEXT.getValue());
//        }

        var serviceUnitName = gspContext.get(ConstanceVarible.GSP_MSU);
        rpcContext.putContext("transmitte-msu",serviceUnitName);
        RpcContextResolver.replay(rpcContext,
                RpcContextTransmittee.TransmitteType.MSU.getValue());

        if (tenantId != null) {
            rpcContext.putContext("transmitte-tenantid",tenantId);
//            rpcContext.putContext("transmitte-check-current-tenantid",false);
            RpcContextResolver.replay(rpcContext,
                    RpcContextTransmittee.TransmitteType.TENANT.getValue());
        }
    }

    /**
     * resotre context
     */
    private void restoreContext(HashMap<String, String> gspContext,Integer tenantId) {
//        if (tenantId != null) {
//            RequestTenantContextHolder.restore();
//        }

        RpcContext rpcContext = RpcContext.getInstance();
        rpcContext.putContext("transmitte-tenantid",tenantId);
        if (tenantId != null) {
            RpcContextResolver.restore(rpcContext,
                    RpcContextTransmittee.TransmitteType.TENANT.getValue());
        }
    }
}
